// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package fetcher

import (
	"errors"
	"fmt"
	"math"
	mrand "math/rand"
	"sort"
	"time"

	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/common/gopool"
	"github.com/ethereum/go-ethereum/common/lru"
	"github.com/ethereum/go-ethereum/common/mclock"
	"github.com/ethereum/go-ethereum/core/txpool"
	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/log"
	"github.com/ethereum/go-ethereum/metrics"
)

const (
	// maxTxAnnounces is the maximum number of unique transactions a peer
	// can announce in a short time.
	maxTxAnnounces = 4096

	// maxTxRetrievals is the maximum number of transactions that can be fetched
	// in one request. The rationale for picking 256 is to have a reasonabe lower
	// bound for the transferred data (don't waste RTTs, transfer more meaningful
	// batch sizes), but also have an upper bound on the sequentiality to allow
	// using our entire peerset for deliveries.
	//
	// This number also acts as a failsafe against malicious announces which might
	// cause us to request more data than we'd expect.
	maxTxRetrievals = 256

	// maxTxRetrievalSize is the max number of bytes that delivered transactions
	// should weigh according to the announcements. The 128KB was chosen to limit
	// retrieving a maximum of one blob transaction at a time to minimize hogging
	// a connection between two peers.
	maxTxRetrievalSize = 128 * 1024

	// maxTxUnderpricedSetSize is the size of the underpriced transaction set that
	// is used to track recent transactions that have been dropped so we don't
	// re-request them.
	maxTxUnderpricedSetSize = 32768

	// maxTxUnderpricedTimeout is the max time a transaction should be stuck in the underpriced set.
	maxTxUnderpricedTimeout = 5 * time.Minute

	// txArriveTimeout is the time allowance before an announced transaction is
	// explicitly requested.
	txArriveTimeout = 500 * time.Millisecond

	// txGatherSlack is the interval used to collate almost-expired announces
	// with network fetches.
	txGatherSlack = 100 * time.Millisecond

	// addTxsBatchSize it the max number of transactions to add in a single batch from a peer.
	addTxsBatchSize = 128
)

var (
	// txFetchTimeout is the maximum allotted time to return an explicitly
	// requested transaction.
	txFetchTimeout = 5 * time.Second
)

var (
	txAnnounceInMeter          = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/in", nil)
	txAnnounceKnownMeter       = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/known", nil)
	txAnnounceUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/underpriced", nil)
	txAnnounceDOSMeter         = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/dos", nil)

	txBroadcastInMeter          = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/in", nil)
	txBroadcastKnownMeter       = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/known", nil)
	txBroadcastUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/underpriced", nil)
	txBroadcastOtherRejectMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/otherreject", nil)

	txRequestOutMeter     = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/out", nil)
	txRequestFailMeter    = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/fail", nil)
	txRequestDoneMeter    = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/done", nil)
	txRequestTimeoutMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/request/timeout", nil)

	txReplyInMeter          = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/in", nil)
	txReplyKnownMeter       = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/known", nil)
	txReplyUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/underpriced", nil)
	txReplyOtherRejectMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/replies/otherreject", nil)

	txFetcherWaitingPeers   = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/peers", nil)
	txFetcherWaitingHashes  = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/hashes", nil)
	txFetcherQueueingPeers  = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/peers", nil)
	txFetcherQueueingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/hashes", nil)
	txFetcherFetchingPeers  = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/peers", nil)
	txFetcherFetchingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/hashes", nil)
)

var errTerminated = errors.New("terminated")

// txAnnounce is the notification of the availability of a batch
// of new transactions in the network.
type txAnnounce struct {
	origin string        // Identifier of the peer originating the notification
	hashes []common.Hash // Batch of transaction hashes being announced
	metas  []txMetadata  // Batch of metadata associated with the hashes
}

// txMetadata provides the extra data transmitted along with the announcement
// for better fetch scheduling.
type txMetadata struct {
	kind byte   // Transaction consensus type
	size uint32 // Transaction size in bytes
}

// txMetadataWithSeq is a wrapper of transaction metadata with an extra field
// tracking the transaction sequence number.
type txMetadataWithSeq struct {
	txMetadata
	seq uint64
}

// txRequest represents an in-flight transaction retrieval request destined to
// a specific peers.
type txRequest struct {
	hashes []common.Hash            // Transactions having been requested
	stolen map[common.Hash]struct{} // Deliveries by someone else (don't re-request)
	time   mclock.AbsTime           // Timestamp of the request
}

// txDelivery is the notification that a batch of transactions have been added
// to the pool and should be untracked.
type txDelivery struct {
	origin string        // Identifier of the peer originating the notification
	hashes []common.Hash // Batch of transaction hashes having been delivered
	metas  []txMetadata  // Batch of metadata associated with the delivered hashes
	direct bool          // Whether this is a direct reply or a broadcast
}

// txDrop is the notification that a peer has disconnected.
type txDrop struct {
	peer string
}

// TxFetcher is responsible for retrieving new transaction based on announcements.
//
// The fetcher operates in 3 stages:
//   - Transactions that are newly discovered are moved into a wait list.
//   - After ~500ms passes, transactions from the wait list that have not been
//     broadcast to us in whole are moved into a queueing area.
//   - When a connected peer doesn't have in-flight retrieval requests, any
//     transaction queued up (and announced by the peer) are allocated to the
//     peer and moved into a fetching status until it's fulfilled or fails.
//
// The invariants of the fetcher are:
//   - Each tracked transaction (hash) must only be present in one of the
//     three stages. This ensures that the fetcher operates akin to a finite
//     state automata and there's no data leak.
//   - Each peer that announced transactions may be scheduled retrievals, but
//     only ever one concurrently. This ensures we can immediately know what is
//     missing from a reply and reschedule it.
type TxFetcher struct {
	notify  chan *txAnnounce
	cleanup chan *txDelivery
	drop    chan *txDrop
	quit    chan struct{}

	txSeq       uint64                             // Unique transaction sequence number
	underpriced *lru.Cache[common.Hash, time.Time] // Transactions discarded as too cheap (don't re-fetch)

	// Stage 1: Waiting lists for newly discovered transactions that might be
	// broadcast without needing explicit request/reply round trips.
	waitlist  map[common.Hash]map[string]struct{}           // Transactions waiting for an potential broadcast
	waittime  map[common.Hash]mclock.AbsTime                // Timestamps when transactions were added to the waitlist
	waitslots map[string]map[common.Hash]*txMetadataWithSeq // Waiting announcements grouped by peer (DoS protection)

	// Stage 2: Queue of transactions that waiting to be allocated to some peer
	// to be retrieved directly.
	announces map[string]map[common.Hash]*txMetadataWithSeq // Set of announced transactions, grouped by origin peer
	announced map[common.Hash]map[string]struct{}           // Set of download locations, grouped by transaction hash

	// Stage 3: Set of transactions currently being retrieved, some which may be
	// fulfilled and some rescheduled. Note, this step shares 'announces' from the
	// previous stage to avoid having to duplicate (need it for DoS checks).
	fetching   map[common.Hash]string              // Transaction set currently being retrieved
	requests   map[string]*txRequest               // In-flight transaction retrievals
	alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails

	// Callbacks
	hasTx    func(common.Hash) bool                     // Retrieves a tx from the local txpool
	addTxs   func(string, []*types.Transaction) []error // Insert a batch of transactions into local txpool
	fetchTxs func(string, []common.Hash) error          // Retrieves a set of txs from a remote peer
	dropPeer func(string)                               // Drops a peer in case of announcement violation

	step     chan struct{}    // Notification channel when the fetcher loop iterates
	clock    mclock.Clock     // Monotonic clock or simulated clock for tests
	realTime func() time.Time // Real system time or simulated time for tests
	rand     *mrand.Rand      // Randomizer to use in tests instead of map range loops (soft-random)
}

// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func(string, []*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
	return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil)
}

// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
func NewTxFetcherForTests(
	hasTx func(common.Hash) bool, addTxs func(string, []*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
	clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher {
	return &TxFetcher{
		notify:      make(chan *txAnnounce),
		cleanup:     make(chan *txDelivery),
		drop:        make(chan *txDrop),
		quit:        make(chan struct{}),
		waitlist:    make(map[common.Hash]map[string]struct{}),
		waittime:    make(map[common.Hash]mclock.AbsTime),
		waitslots:   make(map[string]map[common.Hash]*txMetadataWithSeq),
		announces:   make(map[string]map[common.Hash]*txMetadataWithSeq),
		announced:   make(map[common.Hash]map[string]struct{}),
		fetching:    make(map[common.Hash]string),
		requests:    make(map[string]*txRequest),
		alternates:  make(map[common.Hash]map[string]struct{}),
		underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize),
		hasTx:       hasTx,
		addTxs:      addTxs,
		fetchTxs:    fetchTxs,
		dropPeer:    dropPeer,
		clock:       clock,
		realTime:    realTime,
		rand:        rand,
	}
}

// Notify announces the fetcher of the potential availability of a new batch of
// transactions in the network.
func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []common.Hash) error {
	// Keep track of all the announced transactions
	txAnnounceInMeter.Mark(int64(len(hashes)))

	// Skip any transaction announcements that we already know of, or that we've
	// previously marked as cheap and discarded. This check is of course racy,
	// because multiple concurrent notifies will still manage to pass it, but it's
	// still valuable to check here because it runs concurrent  to the internal
	// loop, so anything caught here is time saved internally.
	var (
		unknownHashes = make([]common.Hash, 0, len(hashes))
		unknownMetas  = make([]txMetadata, 0, len(hashes))

		duplicate   int64
		underpriced int64
	)
	for i, hash := range hashes {
		switch {
		case f.hasTx(hash):
			duplicate++
		case f.isKnownUnderpriced(hash):
			underpriced++
		default:
			unknownHashes = append(unknownHashes, hash)

			// Transaction metadata has been available since eth68, and all
			// legacy eth protocols (prior to eth68) have been deprecated.
			// Therefore, metadata is always expected in the announcement.
			unknownMetas = append(unknownMetas, txMetadata{kind: types[i], size: sizes[i]})
		}
	}
	txAnnounceKnownMeter.Mark(duplicate)
	txAnnounceUnderpricedMeter.Mark(underpriced)

	// If anything's left to announce, push it into the internal loop
	if len(unknownHashes) == 0 {
		return nil
	}
	announce := &txAnnounce{origin: peer, hashes: unknownHashes, metas: unknownMetas}
	select {
	case f.notify <- announce:
		return nil
	case <-f.quit:
		return errTerminated
	}
}

// isKnownUnderpriced reports whether a transaction hash was recently found to be underpriced.
func (f *TxFetcher) isKnownUnderpriced(hash common.Hash) bool {
	prevTime, ok := f.underpriced.Peek(hash)
	if ok && prevTime.Before(f.realTime().Add(-maxTxUnderpricedTimeout)) {
		f.underpriced.Remove(hash)
		return false
	}
	return ok
}

// Enqueue imports a batch of received transaction into the transaction pool
// and the fetcher. This method may be called by both transaction broadcasts and
// direct request replies. The differentiation is important so the fetcher can
// re-schedule missing transactions as soon as possible.
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
	var (
		inMeter          = txReplyInMeter
		knownMeter       = txReplyKnownMeter
		underpricedMeter = txReplyUnderpricedMeter
		otherRejectMeter = txReplyOtherRejectMeter
	)
	if !direct {
		inMeter = txBroadcastInMeter
		knownMeter = txBroadcastKnownMeter
		underpricedMeter = txBroadcastUnderpricedMeter
		otherRejectMeter = txBroadcastOtherRejectMeter
	}
	// Keep track of all the propagated transactions
	inMeter.Mark(int64(len(txs)))

	// Push all the transactions into the pool, tracking underpriced ones to avoid
	// re-requesting them and dropping the peer in case of malicious transfers.
	var (
		added = make([]common.Hash, 0, len(txs))
		metas = make([]txMetadata, 0, len(txs))
	)
	// proceed in batches
	for i := 0; i < len(txs); i += addTxsBatchSize {
		end := i + addTxsBatchSize
		if end > len(txs) {
			end = len(txs)
		}
		var (
			duplicate   int64
			underpriced int64
			otherreject int64
		)
		batch := txs[i:end]

		for j, err := range f.addTxs(peer, batch) {
			// Track the transaction hash if the price is too low for us.
			// Avoid re-request this transaction when we receive another
			// announcement.
			if errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) || errors.Is(err, txpool.ErrTxGasPriceTooLow) {
				f.underpriced.Add(batch[j].Hash(), batch[j].Time())
			}
			// Track a few interesting failure types
			switch {
			case err == nil: // Noop, but need to handle to not count these

			case errors.Is(err, txpool.ErrAlreadyKnown):
				duplicate++

			case errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) || errors.Is(err, txpool.ErrTxGasPriceTooLow):
				underpriced++

			default:
				otherreject++
			}
			added = append(added, batch[j].Hash())
			metas = append(metas, txMetadata{
				kind: batch[j].Type(),
				size: uint32(batch[j].Size()),
			})
		}
		knownMeter.Mark(duplicate)
		underpricedMeter.Mark(underpriced)
		otherRejectMeter.Mark(otherreject)

		// If 'other reject' is >25% of the deliveries in any batch, sleep a bit.
		if otherreject > addTxsBatchSize/4 {
			time.Sleep(200 * time.Millisecond)
			log.Debug("Peer delivering stale transactions", "peer", peer, "rejected", otherreject)
		}
	}
	select {
	case f.cleanup <- &txDelivery{origin: peer, hashes: added, metas: metas, direct: direct}:
		return nil
	case <-f.quit:
		return errTerminated
	}
}

// Drop should be called when a peer disconnects. It cleans up all the internal
// data structures of the given node.
func (f *TxFetcher) Drop(peer string) error {
	select {
	case f.drop <- &txDrop{peer: peer}:
		return nil
	case <-f.quit:
		return errTerminated
	}
}

// Start boots up the announcement based synchroniser, accepting and processing
// hash notifications and block fetches until termination requested.
func (f *TxFetcher) Start() {
	go f.loop()
}

// Stop terminates the announcement based synchroniser, canceling all pending
// operations.
func (f *TxFetcher) Stop() {
	close(f.quit)
}

func (f *TxFetcher) loop() {
	var (
		waitTimer    = new(mclock.Timer)
		timeoutTimer = new(mclock.Timer)

		waitTrigger    = make(chan struct{}, 1)
		timeoutTrigger = make(chan struct{}, 1)
	)
	for {
		select {
		case ann := <-f.notify:
			// Drop part of the new announcements if there are too many accumulated.
			// Note, we could but do not filter already known transactions here as
			// the probability of something arriving between this call and the pre-
			// filter outside is essentially zero.
			used := len(f.waitslots[ann.origin]) + len(f.announces[ann.origin])
			if used >= maxTxAnnounces {
				// This can happen if a set of transactions are requested but not
				// all fulfilled, so the remainder are rescheduled without the cap
				// check. Should be fine as the limit is in the thousands and the
				// request size in the hundreds.
				txAnnounceDOSMeter.Mark(int64(len(ann.hashes)))
				break
			}
			want := used + len(ann.hashes)
			if want > maxTxAnnounces {
				txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces))

				ann.hashes = ann.hashes[:maxTxAnnounces-used]
				ann.metas = ann.metas[:maxTxAnnounces-used]
			}
			// All is well, schedule the remainder of the transactions
			var (
				idleWait   = len(f.waittime) == 0
				_, oldPeer = f.announces[ann.origin]
				hasBlob    bool

				// nextSeq returns the next available sequence number for tagging
				// transaction announcement and also bump it internally.
				nextSeq = func() uint64 {
					seq := f.txSeq
					f.txSeq++
					return seq
				}
			)
			for i, hash := range ann.hashes {
				// If the transaction is already downloading, add it to the list
				// of possible alternates (in case the current retrieval fails) and
				// also account it for the peer.
				if f.alternates[hash] != nil {
					f.alternates[hash][ann.origin] = struct{}{}

					// Stage 2 and 3 share the set of origins per tx
					if announces := f.announces[ann.origin]; announces != nil {
						announces[hash] = &txMetadataWithSeq{
							txMetadata: ann.metas[i],
							seq:        nextSeq(),
						}
					} else {
						f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
							hash: {
								txMetadata: ann.metas[i],
								seq:        nextSeq(),
							},
						}
					}
					continue
				}
				// If the transaction is not downloading, but is already queued
				// from a different peer, track it for the new peer too.
				if f.announced[hash] != nil {
					f.announced[hash][ann.origin] = struct{}{}

					// Stage 2 and 3 share the set of origins per tx
					if announces := f.announces[ann.origin]; announces != nil {
						announces[hash] = &txMetadataWithSeq{
							txMetadata: ann.metas[i],
							seq:        nextSeq(),
						}
					} else {
						f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
							hash: {
								txMetadata: ann.metas[i],
								seq:        nextSeq(),
							},
						}
					}
					continue
				}
				// If the transaction is already known to the fetcher, but not
				// yet downloading, add the peer as an alternate origin in the
				// waiting list.
				if f.waitlist[hash] != nil {
					// Ignore double announcements from the same peer. This is
					// especially important if metadata is also passed along to
					// prevent malicious peers flip-flopping good/bad values.
					if _, ok := f.waitlist[hash][ann.origin]; ok {
						continue
					}
					f.waitlist[hash][ann.origin] = struct{}{}

					if waitslots := f.waitslots[ann.origin]; waitslots != nil {
						waitslots[hash] = &txMetadataWithSeq{
							txMetadata: ann.metas[i],
							seq:        nextSeq(),
						}
					} else {
						f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{
							hash: {
								txMetadata: ann.metas[i],
								seq:        nextSeq(),
							},
						}
					}
					continue
				}
				// Transaction unknown to the fetcher, insert it into the waiting list
				f.waitlist[hash] = map[string]struct{}{ann.origin: {}}

				// Assign the current timestamp as the wait time, but for blob transactions,
				// skip the wait time since they are only announced.
				if ann.metas[i].kind != types.BlobTxType {
					f.waittime[hash] = f.clock.Now()
				} else {
					hasBlob = true
					f.waittime[hash] = f.clock.Now() - mclock.AbsTime(txArriveTimeout)
				}
				if waitslots := f.waitslots[ann.origin]; waitslots != nil {
					waitslots[hash] = &txMetadataWithSeq{
						txMetadata: ann.metas[i],
						seq:        nextSeq(),
					}
				} else {
					f.waitslots[ann.origin] = map[common.Hash]*txMetadataWithSeq{
						hash: {
							txMetadata: ann.metas[i],
							seq:        nextSeq(),
						},
					}
				}
			}
			// If a new item was added to the waitlist, schedule it into the fetcher
			if hasBlob || (idleWait && len(f.waittime) > 0) {
				f.rescheduleWait(waitTimer, waitTrigger)
			}
			// If this peer is new and announced something already queued, maybe
			// request transactions from them
			if !oldPeer && len(f.announces[ann.origin]) > 0 {
				f.scheduleFetches(timeoutTimer, timeoutTrigger, map[string]struct{}{ann.origin: {}})
			}

		case <-waitTrigger:
			// At least one transaction's waiting time ran out, push all expired
			// ones into the retrieval queues
			actives := make(map[string]struct{})
			for hash, instance := range f.waittime {
				if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout {
					// Transaction expired without propagation, schedule for retrieval
					if f.announced[hash] != nil {
						panic("announce tracker already contains waitlist item")
					}
					f.announced[hash] = f.waitlist[hash]
					for peer := range f.waitlist[hash] {
						if announces := f.announces[peer]; announces != nil {
							announces[hash] = f.waitslots[peer][hash]
						} else {
							f.announces[peer] = map[common.Hash]*txMetadataWithSeq{hash: f.waitslots[peer][hash]}
						}
						delete(f.waitslots[peer], hash)
						if len(f.waitslots[peer]) == 0 {
							delete(f.waitslots, peer)
						}
						actives[peer] = struct{}{}
					}
					delete(f.waittime, hash)
					delete(f.waitlist, hash)
				}
			}
			// If transactions are still waiting for propagation, reschedule the wait timer
			if len(f.waittime) > 0 {
				f.rescheduleWait(waitTimer, waitTrigger)
			}
			// If any peers became active and are idle, request transactions from them
			if len(actives) > 0 {
				f.scheduleFetches(timeoutTimer, timeoutTrigger, actives)
			}

		case <-timeoutTrigger:
			// Clean up any expired retrievals and avoid re-requesting them from the
			// same peer (either overloaded or malicious, useless in both cases). We
			// could also penalize (Drop), but there's nothing to gain, and if could
			// possibly further increase the load on it.
			for peer, req := range f.requests {
				if time.Duration(f.clock.Now()-req.time)+txGatherSlack > txFetchTimeout {
					txRequestTimeoutMeter.Mark(int64(len(req.hashes)))

					// Reschedule all the not-yet-delivered fetches to alternate peers
					for _, hash := range req.hashes {
						// Skip rescheduling hashes already delivered by someone else
						if req.stolen != nil {
							if _, ok := req.stolen[hash]; ok {
								continue
							}
						}
						// Move the delivery back from fetching to queued
						if _, ok := f.announced[hash]; ok {
							panic("announced tracker already contains alternate item")
						}
						if f.alternates[hash] != nil { // nil if tx was broadcast during fetch
							f.announced[hash] = f.alternates[hash]
						}
						delete(f.announced[hash], peer)
						if len(f.announced[hash]) == 0 {
							delete(f.announced, hash)
						}
						delete(f.announces[peer], hash)
						delete(f.alternates, hash)
						delete(f.fetching, hash)
					}
					if len(f.announces[peer]) == 0 {
						delete(f.announces, peer)
					}
					// Keep track of the request as dangling, but never expire
					f.requests[peer].hashes = nil
				}
			}
			// Schedule a new transaction retrieval
			f.scheduleFetches(timeoutTimer, timeoutTrigger, nil)

			// No idea if we scheduled something or not, trigger the timer if needed
			// TODO(karalabe): this is kind of lame, can't we dump it into scheduleFetches somehow?
			f.rescheduleTimeout(timeoutTimer, timeoutTrigger)

		case delivery := <-f.cleanup:
			// Independent if the delivery was direct or broadcast, remove all
			// traces of the hash from internal trackers. That said, compare any
			// advertised metadata with the real ones and drop bad peers.
			for i, hash := range delivery.hashes {
				if _, ok := f.waitlist[hash]; ok {
					for peer, txset := range f.waitslots {
						if meta := txset[hash]; meta != nil {
							if delivery.metas[i].kind != meta.kind {
								log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
								f.dropPeer(peer)
							} else if delivery.metas[i].size != meta.size {
								if math.Abs(float64(delivery.metas[i].size)-float64(meta.size)) > 8 {
									log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", delivery.metas[i].size, "ann", meta.size)

									// Normally we should drop a peer considering this is a protocol violation.
									// However, due to the RLP vs consensus format messyness, allow a few bytes
									// wiggle-room where we only warn, but don't drop.
									//
									// TODO(karalabe): Get rid of this relaxation when clients are proven stable.
									f.dropPeer(peer)
								}
							}
						}
						delete(txset, hash)
						if len(txset) == 0 {
							delete(f.waitslots, peer)
						}
					}
					delete(f.waitlist, hash)
					delete(f.waittime, hash)
				} else {
					for peer, txset := range f.announces {
						if meta := txset[hash]; meta != nil {
							if delivery.metas[i].kind != meta.kind {
								log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
								f.dropPeer(peer)
							} else if delivery.metas[i].size != meta.size {
								if math.Abs(float64(delivery.metas[i].size)-float64(meta.size)) > 8 {
									log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", delivery.metas[i].size, "ann", meta.size)

									// Normally we should drop a peer considering this is a protocol violation.
									// However, due to the RLP vs consensus format messyness, allow a few bytes
									// wiggle-room where we only warn, but don't drop.
									//
									// TODO(karalabe): Get rid of this relaxation when clients are proven stable.
									f.dropPeer(peer)
								}
							}
						}
						delete(txset, hash)
						if len(txset) == 0 {
							delete(f.announces, peer)
						}
					}
					delete(f.announced, hash)
					delete(f.alternates, hash)

					// If a transaction currently being fetched from a different
					// origin was delivered (delivery stolen), mark it so the
					// actual delivery won't double schedule it.
					if origin, ok := f.fetching[hash]; ok && (origin != delivery.origin || !delivery.direct) {
						stolen := f.requests[origin].stolen
						if stolen == nil {
							f.requests[origin].stolen = make(map[common.Hash]struct{})
							stolen = f.requests[origin].stolen
						}
						stolen[hash] = struct{}{}
					}
					delete(f.fetching, hash)
				}
			}
			// In case of a direct delivery, also reschedule anything missing
			// from the original query
			if delivery.direct {
				// Mark the requesting successful (independent of individual status)
				txRequestDoneMeter.Mark(int64(len(delivery.hashes)))

				// Make sure something was pending, nuke it
				req := f.requests[delivery.origin]
				if req == nil {
					log.Warn("Unexpected transaction delivery", "peer", delivery.origin)
					break
				}
				delete(f.requests, delivery.origin)

				// Anything not delivered should be re-scheduled (with or without
				// this peer, depending on the response cutoff)
				delivered := make(map[common.Hash]struct{})
				for _, hash := range delivery.hashes {
					delivered[hash] = struct{}{}
				}
				cutoff := len(req.hashes) // If nothing is delivered, assume everything is missing, don't retry!!!
				for i, hash := range req.hashes {
					if _, ok := delivered[hash]; ok {
						cutoff = i
					}
				}
				// Reschedule missing hashes from alternates, not-fulfilled from alt+self
				for i, hash := range req.hashes {
					// Skip rescheduling hashes already delivered by someone else
					if req.stolen != nil {
						if _, ok := req.stolen[hash]; ok {
							continue
						}
					}
					if _, ok := delivered[hash]; !ok {
						if i < cutoff {
							delete(f.alternates[hash], delivery.origin)
							delete(f.announces[delivery.origin], hash)
							if len(f.announces[delivery.origin]) == 0 {
								delete(f.announces, delivery.origin)
							}
						}
						if len(f.alternates[hash]) > 0 {
							if _, ok := f.announced[hash]; ok {
								panic(fmt.Sprintf("announced tracker already contains alternate item: %v", f.announced[hash]))
							}
							f.announced[hash] = f.alternates[hash]
						}
					}
					delete(f.alternates, hash)
					delete(f.fetching, hash)
				}
				// Something was delivered, try to reschedule requests
				f.scheduleFetches(timeoutTimer, timeoutTrigger, nil) // Partial delivery may enable others to deliver too
			}

		case drop := <-f.drop:
			// A peer was dropped, remove all traces of it
			if _, ok := f.waitslots[drop.peer]; ok {
				for hash := range f.waitslots[drop.peer] {
					delete(f.waitlist[hash], drop.peer)
					if len(f.waitlist[hash]) == 0 {
						delete(f.waitlist, hash)
						delete(f.waittime, hash)
					}
				}
				delete(f.waitslots, drop.peer)
				if len(f.waitlist) > 0 {
					f.rescheduleWait(waitTimer, waitTrigger)
				}
			}
			// Clean up any active requests
			var request *txRequest
			if request = f.requests[drop.peer]; request != nil {
				for _, hash := range request.hashes {
					// Skip rescheduling hashes already delivered by someone else
					if request.stolen != nil {
						if _, ok := request.stolen[hash]; ok {
							continue
						}
					}
					// Undelivered hash, reschedule if there's an alternative origin available
					delete(f.alternates[hash], drop.peer)
					if len(f.alternates[hash]) == 0 {
						delete(f.alternates, hash)
					} else {
						f.announced[hash] = f.alternates[hash]
						delete(f.alternates, hash)
					}
					delete(f.fetching, hash)
				}
				delete(f.requests, drop.peer)
			}
			// Clean up general announcement tracking
			if _, ok := f.announces[drop.peer]; ok {
				for hash := range f.announces[drop.peer] {
					delete(f.announced[hash], drop.peer)
					if len(f.announced[hash]) == 0 {
						delete(f.announced, hash)
					}
				}
				delete(f.announces, drop.peer)
			}
			// If a request was cancelled, check if anything needs to be rescheduled
			if request != nil {
				f.scheduleFetches(timeoutTimer, timeoutTrigger, nil)
				f.rescheduleTimeout(timeoutTimer, timeoutTrigger)
			}

		case <-f.quit:
			return
		}
		// No idea what happened, but bump some sanity metrics
		txFetcherWaitingPeers.Update(int64(len(f.waitslots)))
		txFetcherWaitingHashes.Update(int64(len(f.waitlist)))
		txFetcherQueueingPeers.Update(int64(len(f.announces) - len(f.requests)))
		txFetcherQueueingHashes.Update(int64(len(f.announced)))
		txFetcherFetchingPeers.Update(int64(len(f.requests)))
		txFetcherFetchingHashes.Update(int64(len(f.fetching)))

		// Loop did something, ping the step notifier if needed (tests)
		if f.step != nil {
			f.step <- struct{}{}
		}
	}
}

// rescheduleWait iterates over all the transactions currently in the waitlist
// and schedules the movement into the fetcher for the earliest.
//
// The method has a granularity of 'txGatherSlack', since there's not much point in
// spinning over all the transactions just to maybe find one that should trigger
// a few ms earlier.
func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) {
	if *timer != nil {
		(*timer).Stop()
	}
	now := f.clock.Now()

	earliest := now
	for _, instance := range f.waittime {
		if earliest > instance {
			earliest = instance
			if txArriveTimeout-time.Duration(now-earliest) < txGatherSlack {
				break
			}
		}
	}
	*timer = f.clock.AfterFunc(txArriveTimeout-time.Duration(now-earliest), func() {
		trigger <- struct{}{}
	})
}

// rescheduleTimeout iterates over all the transactions currently in flight and
// schedules a cleanup run when the first would trigger.
//
// The method has a granularity of 'txGatherSlack', since there's not much point in
// spinning over all the transactions just to maybe find one that should trigger
// a few ms earlier.
//
// This method is a bit "flaky" "by design". In theory the timeout timer only ever
// should be rescheduled if some request is pending. In practice, a timeout will
// cause the timer to be rescheduled every 5 secs (until the peer comes through or
// disconnects). This is a limitation of the fetcher code because we don't trac
// pending requests and timed out requests separately. Without double tracking, if
// we simply didn't reschedule the timer on all-timeout then the timer would never
// be set again since len(request) > 0 => something's running.
func (f *TxFetcher) rescheduleTimeout(timer *mclock.Timer, trigger chan struct{}) {
	if *timer != nil {
		(*timer).Stop()
	}
	now := f.clock.Now()

	earliest := now
	for _, req := range f.requests {
		// If this request already timed out, skip it altogether
		if req.hashes == nil {
			continue
		}
		if earliest > req.time {
			earliest = req.time
			if txFetchTimeout-time.Duration(now-earliest) < txGatherSlack {
				break
			}
		}
	}
	*timer = f.clock.AfterFunc(txFetchTimeout-time.Duration(now-earliest), func() {
		trigger <- struct{}{}
	})
}

// scheduleFetches starts a batch of retrievals for all available idle peers.
func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, whitelist map[string]struct{}) {
	// Gather the set of peers we want to retrieve from (default to all)
	actives := whitelist
	if actives == nil {
		actives = make(map[string]struct{})
		for peer := range f.announces {
			actives[peer] = struct{}{}
		}
	}
	if len(actives) == 0 {
		return
	}
	// For each active peer, try to schedule some transaction fetches
	idle := len(f.requests) == 0

	f.forEachPeer(actives, func(peer string) {
		if f.requests[peer] != nil {
			return // continue in the for-each
		}
		if len(f.announces[peer]) == 0 {
			return // continue in the for-each
		}
		var (
			hashes = make([]common.Hash, 0, maxTxRetrievals)
			bytes  uint64
		)
		f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta txMetadata) bool {
			// If the transaction is already fetching, skip to the next one
			if _, ok := f.fetching[hash]; ok {
				return true
			}
			// Mark the hash as fetching and stash away possible alternates
			f.fetching[hash] = peer

			if _, ok := f.alternates[hash]; ok {
				panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
			}
			f.alternates[hash] = f.announced[hash]
			delete(f.announced, hash)

			// Accumulate the hash and stop if the limit was reached
			hashes = append(hashes, hash)
			if len(hashes) >= maxTxRetrievals {
				return false // break in the for-each
			}
			bytes += uint64(meta.size)
			return bytes < maxTxRetrievalSize
		})
		// If any hashes were allocated, request them from the peer
		if len(hashes) > 0 {
			f.requests[peer] = &txRequest{hashes: hashes, time: f.clock.Now()}
			txRequestOutMeter.Mark(int64(len(hashes)))
			p := peer
			gopool.Submit(func() {
				// Try to fetch the transactions, but in case of a request
				// failure (e.g. peer disconnected), reschedule the hashes.
				if err := f.fetchTxs(p, hashes); err != nil {
					txRequestFailMeter.Mark(int64(len(hashes)))
					f.Drop(p)
				}
			})
		}
	})
	// If a new request was fired, schedule a timeout timer
	if idle && len(f.requests) > 0 {
		f.rescheduleTimeout(timer, timeout)
	}
}

// forEachPeer does a range loop over a map of peers in production, but during
// testing it does a deterministic sorted random to allow reproducing issues.
func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string)) {
	// If we're running production, use whatever Go's map gives us
	if f.rand == nil {
		for peer := range peers {
			do(peer)
		}
		return
	}
	// We're running the test suite, make iteration deterministic
	list := make([]string, 0, len(peers))
	for peer := range peers {
		list = append(list, peer)
	}
	sort.Strings(list)
	rotateStrings(list, f.rand.Intn(len(list)))
	for _, peer := range list {
		do(peer)
	}
}

// forEachAnnounce loops over the given announcements in arrival order, invoking
// the do function for each until it returns false. We enforce an arrival
// ordering to minimize the chances of transaction nonce-gaps, which result in
// transactions being rejected by the txpool.
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadataWithSeq, do func(hash common.Hash, meta txMetadata) bool) {
	type announcement struct {
		hash common.Hash
		meta txMetadata
		seq  uint64
	}
	// Process announcements by their arrival order
	list := make([]announcement, 0, len(announces))
	for hash, entry := range announces {
		list = append(list, announcement{hash: hash, meta: entry.txMetadata, seq: entry.seq})
	}
	sort.Slice(list, func(i, j int) bool {
		return list[i].seq < list[j].seq
	})
	for i := range list {
		if !do(list[i].hash, list[i].meta) {
			return
		}
	}
}

// rotateStrings rotates the contents of a slice by n steps. This method is only
// used in tests to simulate random map iteration but keep it deterministic.
func rotateStrings(slice []string, n int) {
	orig := make([]string, len(slice))
	copy(orig, slice)

	for i := 0; i < len(orig); i++ {
		slice[i] = orig[(i+n)%len(orig)]
	}
}
